81. 并发工具优于 wait 和 notify

  本书第 1 版中专门用了一个条目来说明如何正确地使用 waitnotify ( Bloch01,详见第 50 条) 。它提出的建议仍然有效,并且在本条目的最后也对此做了概述,但是这条建议现在远远没有之前那么重要了。这是因为几乎没有理由再使用 waitnotify 了。自从 Java 5 发行版本开始, Java 平台就提供了更高级的并发工具,它们可以完成以前必须在 waitnotify 上手写代码来完成的各项工作。 既然正确地使用 wait 和 notify 比较困难,就应该用更高级的并发工具来代替。

  java.util.concurrent 中更高级的工具分成三类: Executor Framework 、并发集合(Concurrent Collection)以及同步器(Synchronizer),Executor Framework 只在第 80 条中简单地提到过,并发集合和同步器将在本条目中进行简单的阐述。

  并发集合为标准的集合接口(如 ListQueueMap )提供了高性能的并发实现。为了提供高并发性,这些实现在内部自己管理同步(详见第 79 条) 。因此, 并发集合中不可能排除并发活动;将它锁定没有什么作用,只会使程序的速度变慢。

  因为无法排除并发集合中的并发活动,这意味着也无法自动地在并发集合中组成方法调用。因此,有些并发集合接口已经通过依赖状态的修改操作(state-dependent modify operation)进行了扩展,它将几个基本操作合并到了单个原子操作中。事实证明,这些操作在并发集合中已经够用,它们通过缺省方法(详见第 21 条)被加到了 Java 8 对应的集合接口中。

  例如, MapputIfAbsent(key, value) 方法,当键没有映射时会替它插入一个映射,并返回与键关联的前一个值,如果没有这样的值,则返回 null 。这样就能很容易地实现线程安全的标准 Map 了。例如,下面这个方法模拟了 String.intern 的行为:

// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public static String intern(String s) {
    String previousValue = map.putIfAbsent(s, s);
    return previousValue == null ? s : previousValue;
}

  事实上,你还可以做得更好。ConcurrentHashMap 对获取操作(如 get)进行了优化。因此,只有当 get 表明有必要的时候,才值得先调用 get ,再调用 putIfAbsent :

// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null)
            result = s;
    }
    return result;
}

  ConcurrentHashMap 除了提供卓越的并发性之外,速度也非常快。在我的机器上,上面这个优化过的 intern 方法比 String.intern 快了不止 6 倍(但是记住, String.intern 必须使用某种弱引用,避免随着时间的推移而发生内存泄漏)。并发集合导致同步的集合大多被废弃了。比如, 应该优先使用 ConcurrentHashMap ,而不是使用 Collections.synchronizedMap 只要用并发 Map 替换同步 Map ,就可以极大地提升并发应用程序的性能。

  有些集合接口已经通过阻塞操作(blocking operation )进行了扩展,它们会一直等待(或者阻塞)到可以成功执行为止。例如, BlockingQueue 扩展了 Queue 接口,并添加了包括 take 在内的几个方法,它从队列中删除并返回了头元素,如果队列为空,就等待。这样就允许将阻塞队列用于工作队列(work queue),也称作生产者一消费者队列 (producer-consumer queue),一个或者多个生产者线程(producer thread)在工作队列中添加工作项目,并且当工作项目可用时,一个或者多个消费者线程(consumer thread )则从工作队列中取出队列并处理工作项目。不出所料,大多数 ExecutorService 实现(包括 ThreadPoolExecutor)都使用了一个 BlockingQueue(详见第 80 条) 。

  同步器(Synchronizer)是使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是 CountDownLatchSemaphore 。较不常用的是 CyclicBarrierExchanger 。功能最强大的同步器是 Phaser

  倒计数锁存器(Countdown Latch)是一次性的障碍,允许一个或者多个线程等待一个或者多个其他线程来做某些事情。CountDownLatch 的唯一构造器带有一个 int 类型的参数,这个 int 参数是指允许所有在等待的线程被处理之前,必须在锁存器上调用 countDown 方法的次数。

  要在这个简单的基本类型之上构建一些有用的东西,做起来是相当容易。例如,假设想要构建一个简单的框架,用来给一个动作的并发执行定时。这个框架中只包含单个方法,该方法带有一个执行该动作的 executor ,一个并发级别(表示要并发执行该动作的次数),以及表示该动作的 runnable 。所有的工作线程( worker thread )自身都准备好,要在 timer 线程启动时钟之前运行该动作。当最后一个工作线程准备好运行该动作时, timer 线程就「发射发令枪(fires the starting gun)」,同时允许工作线程执行该动作。一旦最后一个工作线程执行完该动作,timer 线程就立即停止计时。直接在 wait 和 notify 之上实现这个逻辑会很棍乱,而在 CountDownLatch 之上实现则相当简单:

// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
                        Runnable action) throws InterruptedException {
    CountDownLatch ready = new CountDownLatch(concurrency);
    CountDownLatch start = new CountDownLatch(1);
    CountDownLatch done = new CountDownLatch(concurrency);
    for (int i = 0; i < concurrency; i++) {
        executor.execute(() -> {
            ready.countDown();
            // Tell timer we're ready
            try {
                start.await();
                // Wait till peers are ready
                action.run();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                done.countDown();
                // Tell timer we're done
            }
        });
    }
    ready.await();
    // Wait for all workers to be ready
    long startNanos = System.nanoTime();
    start.countDown();
    // And they're off!
    done.await();
    // Wait for all workers to finish
    return System.nanoTime() - startNanos;
}

  注意这个方法使用了三个倒计数锁存器。第一个是 ready ,工作线程用它来告诉 timer 线程它们已经准备好了。然后工作线程在第二个锁存器 start 上等待。当最后一个工作线程调用 ready.countDown 时, timer 线程记录下起始时间,并调用 start.countDown 允许所有的工作线程继续进行。然后 timer 线程在第三个锁存器 done 上等待,直到最后一个工作线程运行完该动作,并调用 done.countDown 。一旦调用这个, timer 线程就会苏醒过来,并记录下结束的时间。

  还有一些细节值得注意。传递给 time 方法的 executor 必须允许创建至少与指定并发级别一样多的线程,否则这个测试就永远不会结束。这就是线程饥饿死锁(thread starvationdeadlock) [Goetz06, 8.1.1] 。如果工作线程捕捉到 InterruptedException ,就会利用习惯用法 Thread.currentThread().interrupt() 重新断言中断,并从它的 run 方法中返回。这样就允许 executor 在必要的时候处理中断,事实上也理应如此。注意,我们利用了 System.nanoTime 来给活动定时。对于间歇式的定时,始终应该优先使用 System.nanoTime ,而不是使用 System.currentTimeMillis 。因为 System.nanoTime 更准确,也更精确,它不受系统的实时时钟的调整所影响。最后,注意本例中的代码并不能进行准确的定时,除非 action 能完成一定量的工作,比如一秒或者一秒以上。众所周知,准确的微基准测试十分困难,最好在专门的框架如 jmh 的协助下进行[JMH] 。

  本条目仅仅触及了并发工具的一些皮毛。例如,前一个例子中的那三个倒计数锁存器其实可以用一个 CyclicBarrier 或者 Phaser 实例代替。这样得到的代码更加简洁,但是理解起来比较困难。虽然你始终应该优先使用并发工具,而不是使用 wait 方法和 notify 方法,但可能必须维护使用了 wait 方法和 notify 方法的遗留代码。wait 方法被用来使线程等待某个条件。它必须在同步区域内部被调用,这个同步区域将对象锁定在了调用 wait 方法的对象上。下面是使用 wait 方法的标准模式:

// The standard idiom for using the wait method
synchronized (obj) {
    while (<condition does not hold>)
    obj.wait();
    // (Releases lock, and reacquires on wakeup)
    ... // Perform action appropriate to condition
}

  始终应该使用 wait 循环模式来调用 wait 方法;永远不要在循环之外调用 wait 方法。循环会在等待之前和之后对条件进行测试。

  在等待之前测试条件,当条件已经成立时就跳过等待,这对于确保活性是必要的。如果条件已经成立,并且在线程等待之前, notify (或者 notifyAll)方法已经被调用, 则无法保证该线程总会从等待中苏醒过来。

  在等待之前测试条件,如果条件不成立的话继续等待,这对于确保安全性是必要的。当条件不成立的时候,如果线程继续执行,则可能会破坏被锁保护的约束关系。当条件不成立时,有以下理由可使一个线程苏醒过来:

  • 另一个线程可能已经得到了锁,并且从一个线程调用 notify 方法那一刻起,到等待线程苏醒过来的这段时间中,得到锁的线程已经改变了受保护的状态。
  • 条件并不成立,但是另一个线程可能意外地或恶意地调用了 notify 方法。在公有可访问的对象上等待,这些类实际上把自己暴露在了这种危险的境地中。公有可访问对象的同步方法中包含的 wait 方法都会出现这样的问题。
  • 通知线程( notifying thread )在唤醒等待线程时可能会过于「慷慨」 。例如,即使只有某些等待线程的条件已经被满足,但是通知线程可能仍然调用 notifyAll 方法。
  • 在没有通知的情况下,等待线程也可能(但很少)会苏醒过来。这被称为“伪唤醒”(spurious wakeup) [POSIX, 11.4.3.6.1; Java9-api] 。

  一个相关的话题是,为了唤醒正在等待的线程,你应该使用 notify 方法还是 notifyAll 方法(回忆一下, notify 方法唤醒的是单个正在等待的线程,假设有这样的线程存在,而 notifyAll 方法唤醒的则是所有正在等待的线程) 。一种常见的说法是,应该始终使用 notifyAll 方法。这是合理而保守的建议。它总会产生正确的结果,因为它可以保证你将会唤醒所有需要被唤醒的线程。你可能也会唤醒其他一些线程,但是这不会影响程序的正确性。这些线程醒来之后,会检查它们正在等待的条件,如果发现条件并不满足,就会继续等待。

  从优化的角度来看,如果处于等待状态的所有线程都在等待同一个条件,而每次只有一个线程可以从这个条件中被唤醒,那么你就应该选择调用 notify 方法,而不是 notifyAll 方法。

  即使这些前提条件都满足,也许还是有理由使用 notifyAll 方法而不是 notify 方法。就好像把 wait 方法调用放在一个循环中,以避免在公有可访问对象上的意外或恶意的通知一样,与此类似,使用 notifyAll 方法代替 notify 方法可以避免来自不相关线程的意外或恶意的等待。否则,这样的等待会“吞掉”一个关键的通知,使真正的接收线程无限地等待下去。

  简而言之,直接使用 wait 方法和 notify 方法就像用“并发汇编语言”进行编程一样,而 java.util.concurrent 则提供了更高级的语言。 没有理由在新代码中使用 wait 方法和 notify 方法,即使有,也是极少的。 如果你在维护使用 wait 方法和 notify 方法的代码,务必确保始终是利用标准的模式从 while 循环内部调用 wait 方法。一般情况下,应该优先使用 notifyAll 方法,而不是使用 notify 方法。如果使用 notify 方法,请一定要小心,以确保程序的活性。

results matching ""

    No results matching ""